Kafka(七)

您所在的位置:网站首页 kafka指定时间戳消费 命令行 Kafka(七)

Kafka(七)

2023-09-17 02:58| 来源: 网络整理| 查看: 265

本篇博客要点如下:

一.环境准备

二.Kafka2.1.1常用命令总结

启动Kafka停止Kafka查看kafka消息列表创建topic删除topic消息生产消息消费查看某个topic详情修改分区数

三.可能会用到的其它命令

通过命令行的方式修改Kafka topic配置快速定位某个topic异常的分区消费端从任意指定的偏移量开始消费数据使用指定的消费者组进行消费

四.参考资料

一.环境准备 1. 本博客所列举的所有命令,均针对于Kafka2.0.0及以上版本,并不适用于低版本 2. 本博客所有命令,均是在正确配置好KAFKA_HOM 以及在PATH下面添加Kafka bin目录 PATH=$PATH:$KAFKA_HOME/bin 的基础上执行的 否则,需要去到kafka安装目录的bin目录下,执行相关脚本 二.Kafka2.1.1常用命令总结 启动Kafka

Kafka的正常运行,离不开Zookeeper提供的协调服务,因此首先要启动Zookeeper

zkServer.sh start

在这里插入图片描述 接下来启动kafka

#启动Kafka,指定配置文件为与bin目录同级目录config目录下的server.properties 文件 kafka-server-start.sh -daemon ../config/server.properties

使用jps命令,查看启动情况:

jps

如下图所示,证明Kafka启动成功 在这里插入图片描述

停止Kafka 执行命令 : kafka-server-stop.sh

使用jps命令查看,已经没有Kakfa相关进程 在这里插入图片描述

查看kafka消息列表 # 其中 linux01指的是主机名, 2181是zookeeper的端口号 kafka-topics.sh --zookeeper linux01:2181 --list

运行结果如下 : 在这里插入图片描述

创建topic # 创建一个具有3个副本,2个分区的topic,topic命名为 first kafka-topics.sh --zookeeper linux01:2181 --create --replication-factor 3 --partitions 2 --topic first

在这里插入图片描述

删除topic # 删除名为first的topic kafka-topics.sh --zookeeper linux01:2181 --delete --topic first

执行得到如下提示 :

#topic first被标记删除 #但如果server.properties中未设置delete.topic.enable=true,不会有任何影响 Topic first is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.

我们将server.properties中设置delete.topic.enable=true, 再来执行这个命令,看看结果如何 在这里插入图片描述 在这里插入图片描述 通过上述两张图片我们看到,经过配置文件修改之后,上述topic已经被删除

消息生产 # 启动topic test的生产端 kafka-console-producer.sh --broker-list linux01:9092 --topic test

之后就可以在弹出来的输入框中生产消息 在这里插入图片描述

消息消费 #从头开始消费topic test kafka-console-consumer.sh --bootstrap-server linux01:9092 --from-beginning --topic test

执行情况如下图: 在这里插入图片描述如果去掉参数 --from-beginning消费呢?

# 继续之前的偏移量开始消费 kafka-console-consumer.sh --bootstrap-server linux01:9092 --topic test

我们在生产端生产一条消息 test

从下图中我们可以得知,如果不指定参数,是继续之前的偏移量消费 在这里插入图片描述

查看某个topic详情 # 查看topic test的详细情况 kafka-topics.sh --zookeeper linux01:2181 --describe --topic test

运行结果如下图所示 在这里插入图片描述

其中 : 第一行是对所有分区的一个总结 topic test 总共拥有1个分区,复制因子为1 从第二行开始,是对每个分区的具体描述,因为我们这个topic只有一个分区,所以只显示一行 leader 是在给出的所有partitons中负责读写的节点,每个节点都有可能成为leader replicas 显示给定partiton所有副本所存储节点的节点列表,不管该节点是否是leader或者是否存活。 isr 副本都已同步的的节点集合,这个集合中的所有节点都是存活状态,并且跟leader同步 在这里即 : 分区0的 leader为 broker.id=3的节点, 同时副本也是broker.id=3的节点(因为复制因子为1) broker.id=3的副本跟leader同步 修改分区数 # 把topic test 的分区设置为4个 kafka-topics.sh --zookeeper linux01:2181 --alter --topic test --partitions 4

运行结果如下:

# 这里给了一个警告 : 如果为具有关键字的主题增加了分区,则分区逻辑或消息的顺序将受到影响。 # 如果我们的业务场景对消息的顺序有着严格的要求, 一定要谨慎添加分区! 建议将之前所有的消息全部消费完才执行添加分区操作 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded!

上面我们的分区数从1,添加到了4, 成功了 如果我们尝试减少分区数,会发生什么呢?

# 首先给出了警告 # 然后报错,Kafka的分区数,只能增加,不能减少 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Error while executing topic command : The number of partitions for a topic can only be increased. Topic test currently has 4 partitions, 2 would not be an increase. [2020-02-24 16:52:20,845] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitionsfor a topic can only be increased. Topic test currently has 4 partitions, 2 would not be an increase. (kafka.admin.TopicCommand$)

那么我们来思考一下,为什么Kafka的分区数不能减少?

假设Kafka的分区数可以减少.那么首先就有一个需要考虑的问题,比如删除掉的分区中的消息该作何处理? 如果随着分区一起消失则消息的可靠性得不到保障; 如果需要保留那么该如何保留呢? 如果直接存储到现有分区的尾部,消息的时间戳就不会递增,如此对于Spark、Flink这类需要消息时间戳(事件时间)的组件将会受到影响; 如果分散插入到现有的分区中,那么在消息量很大的时候,内部的数据复制会占用很大的资源,而且在复制期间,此主题的可用性又如何呢?

从技术的角度来说,减少Kafka的分区数目应该不难,但是因为这个小小的功能所带来的一系列麻烦确实在是过于恐怖,索性不提供这样的功能出来

三.可能会用到的其它命令 通过命令行的方式修改Kafka topic配置

我们可以通过 --config 参数来对正在create 或者 alter的主题进行配置的覆盖, 使创建的主题更能满足我们实际业务的需求

比如说: Kafka默认允许的最大消息大小是1M,但是对于视频或者图片信息来说, 我们认为这1M远远达不到实际需要,这个时候,就可以采用下面的命令:

# 将topic test的允许最大消息大小设置为10M kafka-topics.sh --zookeeper linux02:2181 --alter --topic test --config max.message.bytes=10485760

除了上述举例的参数,以下这些参数也是允许在创建或者更改的时候覆盖的

# configurations: cleanup.policy compression.type delete.retention.ms file.delete.delay.ms flush.messages flush.ms follower.replication.throttled. # replicas index.interval.bytes leader.replication.throttled.replicas max.message.bytes message.downconversion.enable message.format.version message.timestamp.difference.max.ms message.timestamp.type min.cleanable.dirty.ratio min.compaction.lag.ms min.insync.replicas preallocate retention.bytes retention.ms segment.bytes segment.index.bytes segment.jitter.ms segment.ms unclean.leader.election.enable 快速定位某个topic异常的分区

假设我们的某一个topic业务量很庞大,分区数目很多. 忽然有一天,Kafka的某个节点挂了,我们就可以通过下面的命令来找出 有哪些分区的leader是不可用的

# 通过unavailable-partitions参数定位出 topic test leader不可用的分区 kafka-topics.sh --zookeeper linux01:2181 --describe --topic test --unavailable-partitions

因为我集群里所有的节点都是正常的,所以执行这个命令什么都没有显示: 在这里插入图片描述

消费端从任意指定的偏移量开始消费数据 # 从偏移量为2开始消费 topic test 分区为0 的数据 # 注意,指定偏移量的同时,一定也要指定分区 kafka-console-consumer.sh --bootstrap-server linux01:9092 --offset 2 --topic test --partition 0

消息生产详情: 在这里插入图片描述 指定偏移量消费详情:

在这里插入图片描述

使用指定的消费者组进行消费 # 使用消费者组xmr 从最早的偏移量消费 topic test的数据 kafka-console-consumer.sh --bootstrap-server linux01:9092 --group xmr --from-beginning --topic test

运行情况如下图所示: 在这里插入图片描述

参考资料

Kafka官方网站



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3